package com.xiaohu.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * spark作业执行的特点：
 * 1、只有遇到行动算子的时候，整个spark作业才会被触发执行
 * 2、遇到几次，执行几次
 *
 *
 * RDD: 弹性分布式数据集
 * 弹性：数据量可大可小
 * RDD类似于容器，但是本身存储的不是数据，是计算逻辑
 * 当遇到行动算子的时候，整个spark作业才会被触发执行，是从第一个RDD开始执行，数据才开始产生流动
 * 数据在RDD之间只是流动关系，不会存储
 * 流动的数据量可以很大，也可以很小，所以称为弹性
 * 分布式：
 * spark本质上它是需要从HDFS中读取数据的，HDFS是分布式，数据block块将来可能会在不同的datanode上
 * RDD中流动的数据，可能会来自不同的datanode中的block块数据
 * 数据集：
 * 计算流动过程中，可以短暂地将RDD看成一个容器，容器中有数据，默认情况下在内存中不会进行存储
 * 后面会有办法将一个RDD的数据存储到磁盘中
 *
 * RDD的5大特性：（面试必问！）
 * 1、RDD是由一系列分区构成
 * 注意：
 * 1）读文件时的minPartitions参数只能决定最小分区数，实际读取文件后的RDD分区数，由数据内容本身以及集群的分布来共同决定的
 * 2）若设置minPartitions的大小比block块数量还少的话，实际上以block块数量来决定分区数
 * 3）产生shuffle的算子调用时，可以传入numPartitions，实际真正改变RDD的分区数，设置多少，最终RDD就有多少分区
 *
 * 2、算子是作用在每一个分区上的
 *
 * 3、RDD与RDD之间存在一些依赖关系
 * 1）窄依赖 前一个RDD中的某一个分区数据只会到后一个RDD中的某一个分区  一对一的关系
 * 2）宽依赖 前一个RDD中的某一个分区数据会进入到后一个RDD中的不同分区中  一对多的关系  也可以通过查看是否产生shuffle来判断
 * 3）整个spark作业会被宽依赖的个数划分若干个stage, Num(stage) = Num(宽依赖) + 1
 * 4）当遇到产生shuffle的算子的时候，涉及到从前一个RDD写数据到磁盘中，从磁盘中读取数据到后一个RDD的现象，
 *    注意：第一次触发执行的时候，磁盘是没有数据的，所以会从第一个RDD产生开始执行
 *    当重复触发相同的执行的时候，对于同一个DAG有向无环图而言，会直接从shuffle之后的RDD开始执行，可以直接从磁盘读取数据。
 * 5）一个阶段中，RDD有几个分区，就会有几个并行task任务
 *
 * 4、kv算子只能作用在kv的RDD上
 *
 * 5、spark会提供最优的任务计算方式，只移动计算，不移动数据。
 *
 *
 *
 *
 *
 */
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //创建spark配置文件对象
    val conf: SparkConf = new SparkConf()
    //设置运行模式
    //如果是本地local模式运行的话，需要设置setMaster
    //将来如果是集群进行，将这句话注释即可
    conf.setMaster("local")
    //设置spark作业的名字
    conf.setAppName("wordcount")

    //创建spark core上下文环境对象
    val sc: SparkContext = new SparkContext(conf)
    //===================================================================================

    //读取文件,每次读取一行
    //RDD是spark core中的核心数据结构，将来运行的时候，数据会在RDD之间流动，默认基于内存计算
    val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")
    //    println(s"linesRDD的分区数：${linesRDD.getNumPartitions}")

    //一行数据根据分隔符分割
    val wordRDD: RDD[String] = linesRDD.flatMap(_.split("\\|"))
    //    println(s"wordRDD的分区数：${wordRDD.getNumPartitions}")


    //将每一个单词组成(word,1)
    val kvRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
    //    println(s"kvRDD的分区数：${kvRDD.getNumPartitions}")

    //根据键进行分组
    val kvRDD2: RDD[(String, Iterable[(String, Int)])] = kvRDD.groupBy(_._1,numPartitions = 5)
    //    println(s"kvRDD2的分区数：${kvRDD2.getNumPartitions}")

    val resRDD: RDD[(String, Int)] = kvRDD2.map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
    //    println(s"resRDD的分区数：${resRDD.getNumPartitions}")

    val resRDD2: RDD[(String, Int)] = resRDD.map((kv: (String, Int)) => {
      println("==================数加防伪码=====================")
      (kv._1, kv._2)
    })
    //打印
    resRDD2.foreach(println)
//    println("=" * 100)
//    resRDD2.foreach(println)

    //    /**
    //     * 链式调用
    //     */
    //    sc.textFile("spark/data/words.txt")
    //      .flatMap(_.split("\\|"))
    //      .map((_, 1))
    //      .groupBy(_._1)
    //      .map((e: (String, Iterable[(String, Int)])) => (e._1, e._2.size))
    //      .foreach(println)

    //指定的是文件夹的路径
    //spark如果是local本地运行的话，会将本地文件系统看作一个hdfs文件系统
    //    resRDD.saveAsTextFile("spark/data/outdata1")

    while (true){

    }
  }
}
